对java8 StreamApi的扩展

java8的流(Stream API)已经提供了很强大的api接口了,满足日常开发的绝大部分需要,但是还是有些欠缺比如 java9才增加的两个api takeWhile dropWhile

takeWhile() 方法使用一个断言作为参数,返回给定 Stream 的子集直到断言语句第一次返回 false。

dropWhile()与takeWhile相反,使用一个断言作为参数,直到断言语句第一次返回 true 才返回给定 Stream 的子集。

还有不爽的地方是同一个流只能执行一次流水线操作,完成后这个流就废了。要执行第二个次流水线操作就必须重新创建流。原因可以看这个java8 Stream流水线实现分析

通过Spliterator的延迟绑定能力实现对java8 流的扩展

takeWhile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
Spliterator<T> spliterator = stream.spliterator();
return StreamSupport.stream(new Spliterator<T>() {
boolean stillGoing = false;

@Override
public boolean tryAdvance(Consumer<? super T> action) {
boolean hasNext = spliterator.tryAdvance(item -> {
if (!predicate.test(item)) {
action.accept(item);
stillGoing = true;
} else {
stillGoing = false;
}
});
return hasNext && stillGoing;
}

@Override
public Spliterator<T> trySplit() {
return null;
}

@Override
public long estimateSize() {
return 0;
}

@Override
public int characteristics() {
return 0;
}
}, false);
}
1
2
3
public static void main(String[] args) {
takeWhile(Stream.of("a", "b", "", "c", "d"), String::isEmpty).forEach(System.out::print);//ab
}

dropWhile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static <T> Stream<T> dropWhile(Stream<T> stream, Predicate<? super T> predicate) {
Spliterator<T> spliterator = stream.spliterator();
return StreamSupport.stream(new Spliterator<T>() {
boolean canGo = false;

@Override
public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(item -> {
if (canGo) {
action.accept(item);
} else {
canGo = predicate.test(item);
}
});

}

@Override
public Spliterator<T> trySplit() {
return null;
}

@Override
public long estimateSize() {
return 0;
}

@Override
public int characteristics() {
return 0;
}
}, false);
}
1
2
3
public static void main(String[] args) {
dropWhile(Stream.of("a", "b", "", "c", "d"), String::isEmpty).forEach(System.out::print);//cd
}

在同一个流中执行多个操作

思路大概是这样的

StreamForker 记录对流的操作,通过ForkerConsumer将原来流中的元素分发生成多个子流(有几个操作,就有几个子流),在子流中调用用户的操作function

具体实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
public class StreamForker<T> {
private final Stream<T> stream;
//记录对流的操作列表
private final Map<Object, Function<Stream<T>, ?>> actionMap = new HashMap<>();

public StreamForker(Stream<T> stream) {
this.stream = stream;
}

public StreamForker<T> fork(Object key, Function<Stream<T>, ?> function) {
actionMap.put(key, function);
return this;
}

public Results getResults() {
ForkerConsumer<T> consumer = build();
try {
//将元素放入ForkerConsumer分发到子流中
stream.sequential().forEach(consumer);
} finally {
//向子流发送结束标识
consumer.finish();
}
return consumer;
}

public ForkerConsumer<T> build() {
List<BlockingQueue<T>> queues = new ArrayList<>();
//获取结果map
Map<Object, Future<?>> futureMap = actionMap.entrySet().stream()
.reduce(new HashMap<>(), (map, entry) -> {
map.put(entry.getKey(), getResult0(queues, entry.getValue()));
return map;
}, (map1, map2) -> {
map1.putAll(map2);
return map1;
});
return new ForkerConsumer<>(queues, futureMap);
}
//异步获取子流的结果
private Future<?> getResult0(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> function) {
BlockingQueue<T> queue = new LinkedBlockingQueue<>();
queues.add(queue);
Spliterator<T> spliterator = new SubSpliterator<>(queue);
Stream<T> souce = StreamSupport.stream(spliterator, false);
return CompletableFuture.supplyAsync(() -> function.apply(souce));
}

public interface Results {
<R> R get(Object key);
}

static class SubSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> queue;

public SubSpliterator(BlockingQueue<T> queue) {
this.queue = queue;
}

@Override
public boolean tryAdvance(Consumer<? super T> action) {
T t;
while (true) {
try {
t = queue.take();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (t != ForkerConsumer.END_OF_STREAM) {
action.accept(t);
return true;
}
return false;
}

@Override
public Spliterator<T> trySplit() {
return null;
}

@Override
public long estimateSize() {
return 0;
}

@Override
public int characteristics() {
return 0;
}
}

static class ForkerConsumer<T> implements Consumer<T>, Results {
static final Object END_OF_STREAM = new Object();
private final List<BlockingQueue<T>> queues;
//存放子流结果
private final Map<Object, Future<?>> results;

public ForkerConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> results) {
this.queues = queues;
this.results = results;
}

void finish() {
//放入结束标识
accept((T) END_OF_STREAM);
}

@Override
public <R> R get(Object key) {
try {
return ((Future<R>) results.get(key)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public void accept(T t) {
//接受到元素放入各个子流的队列中
queues.forEach(q -> q.offer(t));
}
}

public static void main(String[] args) {
List<String> list = Arrays.asList("test", "jiang", "an", "wei");
StreamForker.Results results = new StreamForker<>(list.stream())
.fork("test1", stringStream -> stringStream.sorted().filter(x -> x.equals("test")).findAny())
.fork("count", Stream::count)
.fork("join", stringStream -> stringStream.collect(Collectors.joining(",")))
.getResults();
Optional<String> test1 = results.get("test1");
System.out.println(test1.get());
System.out.println("" + results.get("count"));
System.out.println("" + results.get("join"));
}
}